跳到主要内容

Java Netty 事件循环对象 EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件

EventLoop 接口

EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。

它的继承关系如下

继承自 j.u.c.ScheduledExecutorService 因此 包含了线程池中所有的方法 继承自 netty 自己的 OrderedEventExecutor

  • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
  • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup

EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)

继承自 Netty 自己的 EventExecutorGroup

  • 实现了 Iterable 接口提供遍历 EventLoop 的能力
  • 另有 next 方法获取集合中下一个 EventLoop

EventLoopGroup 和 EventLoop 关系图

下图展示了 Channel、EventLoop 、Thread 以及 EventLoopGroup 之间的关系。

这些关系是:

  • 一个 EventLoopGroup 包含一个或者多个 EventLoop
  • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定
  • 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理
  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop
  • 一个 EventLoop 可能会被分配给一个或多个 Channel。

注意,在这种设计中,一个给定 Channel 的 I/O 操作都是由相同的 Thread 执行的,实际上消除了对于同步的需要。

处理普通与定时任务

前面说过,这个 EventLoop 内部就是继承自 ScheduledExecutorService 因此 包含了线程池中所有的方法,这里编写一个测试样例:

public class TestEventLoop {
public static void main(String[] args) {
// 创建拥有两个 EventLoop 的 NioEventLoopGroup,对应两个线程
EventLoopGroup group = new NioEventLoopGroup(2);
// 通过 next 方法可以获得下一个 EventLoop
System.out.println(group.next());
System.out.println(group.next());

// 通过 EventLoop 执行普通任务
group.next().execute(() -> {
System.out.println(Thread.currentThread().getName() + " hello");
});

// 通过 EventLoop 执行定时任务
group.next().scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + " hello2");
}, 0, 1, TimeUnit.SECONDS);

// 优雅地关闭
group.shutdownGracefully();
}
}

输出:

io.netty.channel.nio.NioEventLoop@19e1023e
io.netty.channel.nio.NioEventLoop@7cef4e59
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2

关闭 EventLoopGroup

优雅关闭 shutdownGracefully 方法。

该方法会首先切换 EventLoopGroup 到关闭状态从而 拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

处理 IO 任务

再来看下 Pipeline 与 EventLoop 的关系,一个 Channel 在它的生命周期内只注册于一个 EventLoop,而一个 Pipeline 与一个 Channel 绑定

服务器代码

public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));

}
});
}
})
.bind(8080);
}
}

一个很普通的用法,创建一个 ChannelInboundHandlerAdapter 入站处理器

补充:

Handler 就是一个处理器,它有上图所示的两种,主要就是出站处理器,和入站处理器,而 Pipeline 则是编排这些 Handler(Pipeline 会自动取得这两种处理器)的管道,消息就是按照 Pipeline 编排的顺序在一个个 Handler 之间流动

它们的关系如图所示:

再介绍一下它们之间是怎么联系的

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received:" + in.toString(Charset.defaultCharset()));
// 再次将接收到的消息发回给发送者,而不冲刷出站消息
ctx.write(in);
}

ChannelHandlerContext 用于管理它所关联的 ChannelHandler 和同一个 ChannelPipeline 中的下一个 ChannelHandler 的交互(每当有 handler 添加到 Pipeline 时,都会创建 context,创建之后 context 和 handler 的关系永远都不会变,因而可以缓存 context 的引用),如果事件从 channel 或者 channelpipeline 上触发将沿整个 pipeline 传播,但是 context 上的相同触发方式只会传递给 pipeline 上的下一个能够处理的 handler

客户端代码

public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
// 此处打断点调试,调用 channel.writeAndFlush(...);
System.in.read();
}
}

这里给这个 ChannelPipeline 注册一个 StringEncoder 编码器

增加 Worker Group

Bootstrap 的 group() 方法可以传入两个 EventLoopGroup 参数,分别负责处理不同的事件

如下:服务器端一个 Boss Group 接受 Accept 事件与两个 Worker Group 工人负责读写事件

new ServerBootstrap()
// 两个Group,分别为:
// * Boss:负责Accept事件,
// * Worker:负责读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
}).bind(8080).sync();

客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)

public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("init...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();

channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));

最后输出

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan       
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu

可以看出,可以看到两个工人轮流处理 channel,且 EventLoop 一旦与 Channel 绑定,则一直负责处理该 Channel 中的事件

增加自定义 EventLoopGroup

当有的任务需要 较长的时间处理时,可以使用非 NioEventLoopGroup,避免同一个 NioEventLoop 中的其他 Channel 在较长的时间内都无法得到处理

public class MyServer {
public static void main(String[] args) {
// 增加自定义的非NioEventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();

new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 增加两个 handler,第一个使用 NioEventLoopGroup处理,第二个使用自定义 EventLoopGroup 处理
socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
// 调用下一个handler
ctx.fireChannelRead(msg);
}
})
// 该 handler 绑定自定义的 Group
.addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}

启动四个客户端发送数据(同上的操作)

nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4

可以看出,客户端与服务器之间的事件,被 nioEventLoopGroup 和 defaultEventLoopGroup 分别处理

切换 Group 的原理

由上面的图可以看出,当 handler 中绑定的 Group 不同时,需要切换 Group 来执行不同的任务

这个切换的关键代码位于: io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

不同的 EventLoopGroup 切换的实现原理如下:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
EventExecutor executor = next.executor();

// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用